package com.tpvlog.dfs.namenode.register;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 负责管理集群里的所有DataNode
 *
 * @author Ressmix
 */
public class DataNodeManager {

    // 首次注册
    public static final Integer REGISTER_FIRST = 10000;

    // 已经存在且注册成功
    public static final Integer REGISTER_SUCCESS_EXIST = 10001;

    // 首次心跳
    public static final Integer RENEW_FIRST = 10003;

    // 已经存在且心跳成功
    public static final Integer RENEW_SUCCESS_EXIST = 10004;

    // 集群中所有的DataNode信息，Key为IP-HOSTNAME
    private Map<String, DataNodeInfo> datanodes = new ConcurrentHashMap<>();

    // 文件与DataNode的映射，Key为文件名
    private Map<String, List<DataNodeInfo>> datanodeMappedByFile = new HashMap<>();

    private ReentrantReadWriteLock rrw = new ReentrantReadWriteLock();

    public DataNodeManager() {
        // 启动心跳管理线程
        new DataNodeAliveMonitor().start();
    }

    /**
     * datanode注册
     */
    public Integer register(String ip, String hostname, int nioPort) {
        if (containDataNode(ip, hostname)) {
            System.out.println("当前已经存在这个DataNode了......");
            return REGISTER_SUCCESS_EXIST;
        }
        setDataNodeInfo(ip, hostname, new DataNodeInfo(ip, hostname, nioPort));
        System.out.println("DataNode注册：ip=" + ip + ",hostname=" + hostname + ", nioPort=" + nioPort);
        return REGISTER_FIRST;
    }

    /**
     * datanode心跳
     */
    public Integer heartbeat(String ip, String hostname, int nioPort) {
        DataNodeInfo datanode = getDataNodeInfo(ip, hostname);
        if (datanode == null) {
            return RENEW_FIRST;
        }

        datanode.setLatestHeartbeatTime(System.currentTimeMillis());
        System.out.println("DataNode发送心跳：ip=" + ip + ",hostname=" + hostname);
        return RENEW_SUCCESS_EXIST;
    }

    /**
     * 获取可供上传的DataNode节点及副本
     */
    public List<DataNodeInfo> allocateDataNodes(long fileSize) {
        synchronized (this) {
            List<DataNodeInfo> datanodeList = new ArrayList<>();
            for (DataNodeInfo datanode : datanodes.values()) {
                datanodeList.add(datanode);
            }
            Collections.sort(datanodeList);

            // 选择存储数据最少的头两个datanode出来
            List<DataNodeInfo> selectedDatanodes = new ArrayList<>();
            if (datanodeList.size() >= 2) {
                selectedDatanodes.add(datanodeList.get(0));
                selectedDatanodes.add(datanodeList.get(1));
                // 增加节点存储数据的大小
                datanodeList.get(0).addStoredDataSize(fileSize);
                datanodeList.get(1).addStoredDataSize(fileSize);
            }

            return selectedDatanodes;
        }
    }

/**
 * 获取可供下载的DataNode节点
 *
 * @param filename
 * @return
 */
public DataNodeInfo getDataNodeForFile(String filename) {
    try {
        rrw.readLock().lock();

        List<DataNodeInfo> datanodes = datanodeMappedByFile.get(filename);
        int size = datanodes.size();
        // 随机选择一个
        Random random = new Random(System.currentTimeMillis());
        int index = random.nextInt(size);
        return datanodes.get(index);
    } finally {
        rrw.readLock().lock();
    }
}

    public void deltaReportDataNodeInfo(String hostname, String ip, String filename) {
        try {
            rrw.writeLock().lock();

            List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename);
            if (replicas == null) {
                replicas = new ArrayList<>();
                datanodeMappedByFile.put(filename, replicas);
            }
            DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);
            replicas.add(datanode);

            System.out.println("收到DataNode增量上报信息，当前的副本信息为：" + datanodeMappedByFile);
        } finally {
            rrw.writeLock().unlock();
        }
    }

    public void fullyReportDataNodeInfo(String ip, String hostname, List<String> filenameList, Long storedDataSize) {
        DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);
        datanode.setStoredDataSize(storedDataSize);
        for (String filename : filenameList) {
            this.deltaReportDataNodeInfo(hostname, ip, filename);
        }
    }

    public DataNodeInfo getDataNodeInfo(String ip, String hostname) {
        return datanodes.get(ip + "-" + hostname);
    }

    public void setDataNodeInfo(String ip, String hostname, DataNodeInfo dataNodeInfo) {
        datanodes.put(ip + "-" + hostname, dataNodeInfo);
    }

    public Boolean containDataNode(String ip, String hostname) {
        return datanodes.containsKey(ip + "-" + hostname);
    }



    /*---------------------------------------------PRIVATE METHOD----------------------------------------------*/

    /**
     * datanode是否存活的监控线程
     */
    private class DataNodeAliveMonitor extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    List<String> toRemoveDatanodes = new ArrayList<>();

                    Iterator<DataNodeInfo> datanodesIterator = datanodes.values().iterator();
                    DataNodeInfo datanode = null;
                    while (datanodesIterator.hasNext()) {
                        datanode = datanodesIterator.next();
                        // 遍历保存的DataNode节点，如果超过90秒未上送心跳，则移除
                        if (System.currentTimeMillis() - datanode.getLatestHeartbeatTime() > 90 * 1000) {
                            toRemoveDatanodes.add(datanode.getIp() + "-" + datanode.getHostname());
                        }
                    }
                    if (!toRemoveDatanodes.isEmpty()) {
                        for (String toRemoveDatanode : toRemoveDatanodes) {
                            datanodes.remove(toRemoveDatanode);
                        }
                    }
                    // 每隔30秒检测一次
                    Thread.sleep(30 * 1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
